package com.demo.flink;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;
import java.net.URL;

/**
 * @author : pengjie
 * @PackageName : com.demo.flink
 * @Description : TODO
 * @email : 627799251@qq.com
 * @Date : 2019/1/29 17:58
 */
public class UserBehaviorGenerator implements SourceFunction<UserBehavior> {
    @Override
    public void run(SourceContext<UserBehavior> sourceContext) throws Exception {
        while (true){
            URL fileUrl = HotItems.class.getClassLoader().getResource("UserBehavior1.csv");
            File file = new File(fileUrl.toURI());
            FileReader in = new FileReader(file);
            LineNumberReader reader = new LineNumberReader(in);
            String line = null;
            //while ((line = reader.readLine()) != null){
                UserBehavior behavior = new UserBehavior();
                //String[] dta = line.split(",");
                behavior.userId = (long) (Math.random()*1000000);
                behavior.itemId = (long) (Math.random()*100);
                behavior.categoryId = (int) (Math.random()*100);
                behavior.behavior = "pv";
                behavior.timestamp = System.currentTimeMillis();

                sourceContext.collectWithTimestamp(behavior, System.currentTimeMillis());

            //}
            reader.close();
            in.close();

            Thread.sleep(1);
        }
    }

    @Override
    public void cancel() {

    }
}
